package com.fintech.pangu.elasticsearch.service.impl;

import com.fintech.pangu.elasticsearch.constants.RuleConstant;
import com.fintech.pangu.elasticsearch.service.DocumentService;
import com.fintech.pangu.elasticsearch.service.IndexService;
import com.fintech.pangu.elasticsearch.validation.Update;
import com.fintech.pangu.elasticsearch.validation.ValidatorsUtil;
import com.fintech.pangu.elasticsearch.vo.SingleDocumentHandleVO;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.script.Script;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;

import java.util.ArrayList;
import java.util.List;

/**
 * 文档操作服务
 *
 * @author xujunqi
 * @author chendongdong
 * @date 2019/1/23 14:39
 */
public class DocumentServiceImpl implements DocumentService {
    private final static Logger logger = LoggerFactory.getLogger(DocumentServiceImpl.class);

    private TransportClient transportClient;

    private IndexService indexService;

    public DocumentServiceImpl() {
    }

    public DocumentServiceImpl(TransportClient transportClient, IndexService indexService) {
        this.transportClient = transportClient;
        this.indexService = indexService;
    }

    public DocumentServiceImpl transportClient(TransportClient transportClient) {
        this.transportClient = transportClient;
        return this;
    }

    public DocumentServiceImpl indexService(IndexService indexService) {
        this.indexService = indexService;
        return this;
    }

    /**
     * 查询单个文档对象
     *
     * @param index 索引名称
     * @param type  文档类型名称
     * @param id    文档id
     * @return
     */
    @Override
    public String getSingleDocumentAsString(String index, String type, String id) throws Exception {
        GetResponse response = getSingleDocument(index, type, id);

        if (!response.isExists()) {
            logger.debug("查询单个文档[index={}, type={}, id={}], 对应的文档不存在", index, type, id);
            return null;
        }

        return response.getSourceAsString();
    }

    private GetResponse getSingleDocument(String index, String type, String id) throws Exception {
        singleDocumentParamInitCheck(index, type, id);

        GetResponse response = transportClient.prepareGet(index, type, id).get();

        return response;
    }

    /**
     * 查询多个文档对象
     *
     * @param index 索引名称
     * @param type  文档类型名称
     * @param ids   文档ids
     * @return
     */
    @Override
    public List<String> multiGetDocuments(String index, String type, String... ids) {
        Assert.notNull(index, "索引名称为空");
        Assert.notNull(ids, "文档ids为空");

        MultiGetResponse multiGetItemResponses = transportClient.prepareMultiGet()
                .add(index, type, ids)
                .get();

        List<String> documentList = new ArrayList<>();
        Integer succCounter = 0;
        Integer failCounter = 0;
        for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
            GetResponse response = itemResponse.getResponse();
            if (response != null && response.isExists()) {
                succCounter++;
                String json = response.getSourceAsString();
                documentList.add(json);
                logger.debug("多文档查询[index={}, type={}], 文档id={}查询成功", index, type, response.getId());
            } else {
                failCounter++;
                logger.debug("多文档查询[index={}, type={}], 文档id={}查询失败, 文档不存在", index, type, response);
            }
        }

        if (ids.length != succCounter) {
            logger.warn("多文档查询[index={}, type={}], 请求查询数为: {}, 成功: {}, 失败: {}", index, type, ids.length, succCounter, failCounter);
        }

        return documentList;
    }

    /**
     * 按照id删除单个文档
     *
     * @param index 索引名称
     * @param type  文档类型名称
     * @param id    文档id
     * @return
     */
    @Override
    public Boolean deleteSingleDocument(String index, String type, String id) throws Exception {
        singleDocumentParamInitCheck(index, type, id);

        DeleteResponse response = transportClient.prepareDelete(index, type, id).get();
        Boolean deleted = DocWriteResponse.Result.DELETED.equals(response.getResult());
        if (deleted) {
            logger.debug("删除单个文档[index={}, type={}, id={}]操作成功", index, type, id);
        } else {
            logger.warn("删除单个文档[index={}, type={}, id={}]操作失败, 返回错误原因: {} ", index, type, id, response.status());
        }

        return deleted;
    }

    /**
     * 单文档操作初始校验
     *
     * @param index
     * @param type  文档类型名称
     * @param id
     */
    @Override
    public void singleDocumentParamInitCheck(String index, String type, String id) throws Exception {
        Assert.notNull(index, "索引名称为空");
        Assert.notNull(type, "文档类型名称为空");
        Assert.notNull(id, "文档id为空");

        if (!indexService.indexExists(index)) {
            logger.warn("单个文档[index={}, type={}, id={}]操作, 索引不存在", index, type, id);
            throw new Exception("索引不存在");
        }
    }

    /**
     * 新增单个文档
     *
     * @param index      索引名称
     * @param jsonSource 文档JSON格式字符串
     * @return
     */
    @Override
    public String insertSingleDocument(String index, String jsonSource) throws Exception {
        return insertSingleDocument(index, index, jsonSource);
    }

    /**
     * 新增单个文档
     *
     * @param index      索引名称
     * @param type       文档类型
     * @param jsonSource 文档JSON格式字符串
     * @return
     */
    @Override
    public String insertSingleDocument(String index, String type, String jsonSource) throws Exception {
        return insertSingleDocument(index, type, null, jsonSource, XContentType.JSON);
    }

    /**
     * 新增单个文档
     *
     * @param index      索引名称
     * @param type       文档类型
     * @param id         文档id(调用方指定)
     * @param jsonSource 文档JSON格式字符串
     * @return
     */
    @Override
    public String insertSingleDocument(String index, String type, String id, String jsonSource) throws Exception {
        return insertSingleDocument(index, type, id, jsonSource, XContentType.JSON);
    }

    /**
     * 新增单个文档
     *
     * @param index        索引名称
     * @param type         文档类型
     * @param id           文档id, 为空时, 自动生成id
     * @param source       文档字符串
     * @param xContentType 文档文件格式类型
     * @return
     */
    private String insertSingleDocument(String index, String type, String id, String source, XContentType xContentType) throws Exception {
        Assert.notNull(index, "索引名称为空");
        Assert.notNull(type, "文档类型名称为空");

        if (!indexService.indexExists(index)) {
            logger.warn("新增单个文档[index={}, type={}, id={}]操作, 索引不存在", index, type, id);
            throw new Exception("索引不存在");
        }

        if (id != null) {
            GetResponse response = getSingleDocument(index, type, id);
            if (response != null && response.isExists()) {
                logger.warn("新增单个文档[index={}, type={}, id={}], 文档已经id存在, 请勿重复操作", index, type, id);
                throw new Exception("文档已经id存在, 请勿重复操作");
            }
        }

        logger.debug("新增单个文档[index={}, type={}, id={}", index, type, id);
        IndexResponse response = transportClient.prepareIndex(index, type, id).setSource(source, xContentType).get();
        logger.debug("单个文档[index={}, type={}, id={}]新增操作完成, 返回结果为: {}", index, type, id, response);

        if (!DocWriteResponse.Result.CREATED.equals(response.getResult())) {
            logger.warn("单个文档[index={}, type={}, id={}]新增失败, 返回结果为: {}", index, type, id, response);
            return null;
        }
        String docId = response.getId();
        logger.info("单个文档[index={}, type={}, id={}, docId={}]新增成功", index, type, id, docId);

        return docId;
    }


    /**
     * 修改单个文档
     *
     * @param singleDocumentHandleVO 单个文档操作请求VO类对象
     * @throws Exception
     **/
    @Override
    public String updateSingleDocument(SingleDocumentHandleVO singleDocumentHandleVO) throws Exception {
        //参数校验
        ValidatorsUtil.validateWithException(singleDocumentHandleVO, Update.class);
        //索引名称
        String index = singleDocumentHandleVO.getIndex();
        //类型名称
        String type = singleDocumentHandleVO.getType();
        //文档ID
        String id = singleDocumentHandleVO.getId();
        //操作模式
        String operateMode = singleDocumentHandleVO.getOperateMode();
        //元数据
        String source = singleDocumentHandleVO.getSource();
        //script脚本
        String script = singleDocumentHandleVO.getScript();
        if (RuleConstant.SOURCE_MODE.equals(operateMode)) {
            Assert.hasText(source, "元数据[source]不能为空");
        } else if (RuleConstant.SCRIPT_MODE.equals(operateMode)) {
            Assert.hasText(script, "script脚本[script]不能为空");
        }
        //内容类型
        XContentType xcontentType = singleDocumentHandleVO.getXcontentType();
        xcontentType = ObjectUtils.isEmpty(xcontentType) ? XContentType.JSON : xcontentType;
        //判断索引是否存在
        if (!indexService.indexExists(index)) {
            logger.warn("修改单个文档[index={}, type={}, id={}]操作, 索引不存在", index, type, id);
            throw new Exception("索引不存在");
        }
        //判断指定ID的文档是否存在
        GetResponse getResponse = transportClient.prepareGet(index, type, id).get();
        if (!getResponse.isExists()) {
            logger.warn("查询单个文档[index={}, type={}, id={}], 对应的文档不存在", index, type, id);
            throw new Exception("文档不存在");
        }
        //更新操作
        UpdateRequestBuilder updateRequestBuilder = transportClient.prepareUpdate(index, type, id);
        UpdateResponse updateResponse = null;
        if (RuleConstant.SOURCE_MODE.equals(operateMode)) {
            updateResponse = updateRequestBuilder.setDoc(source, xcontentType).get();
        } else if (RuleConstant.SCRIPT_MODE.equals(operateMode)) {
            updateResponse = updateRequestBuilder.setScript(new Script(script)).get();
        }
        if (null != updateResponse) {
            DocWriteResponse.Result result = updateResponse.getResult();
            if (!DocWriteResponse.Result.UPDATED.equals(result)) {
                logger.warn("单个文档[index={}, type={}, id={}]修改失败, 返回结果为: {}", index, type, id, updateResponse);
                return null;
            }
            String docId = updateResponse.getId();
            logger.info("单个文档[index={}, type={}, id={}, docId={}]修改成功", index, type, id, docId);
        }
        return null;
    }
}
